import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.rdd.RDD

/**
  * Created by chenjianwen on 2016/3/2.
  */
class YCFOrderDataHandler(val path:String) {

  def ratingRdd(scoreCaculate:(Long)=>Double):RDD[(Int,Rating)]={
      CFContext.sc.textFile(path).map(line => {
        //每一个订单中的每一个产品都需要加入模型训练
          var res:List[((Int,Int),Long)] = List()
          line.split(",") match {
            case Array(orderId,userId,productIds,timestamp,isCancled)=>{
              if(productIds!=null){
                //一个订单中可能有很多产品
                val productIdss = productIds.split("#")
                for(one <- productIdss){
                  res = res :+((userId.toInt,one.toInt),timestamp.toLong)
                }

              }
          }
        }
        res
      }).flatMap(x=>x).groupByKey()
        .map(x=>{
          var score:Double = 0.0
          //相同(userId,productId)的元素评分之后求平均值处理
          x match{
            case ((userId,productId),times)=>{
              times.foreach(time=>{
                score = score + scoreCaculate(time)
              })
              (x._2.iterator.next().toInt%10,Rating(userId,productId,score/times.size))
            }
          }

      }).cache()
  }
}
